#!/usr/bin/env python3
#
# Copyright 2017-2018 Ettus Research, National Instruments Company
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
"""
N3XX Built-In Self Test (BIST)

Will work on all derivatives of the N3xx series.
"""

from __future__ import print_function
import sys
import time
from usrp_mpm import bist

# Timeout values are in seconds:
GPS_WARMUP_TIMEOUT = 70 # Data sheet says "about a minute"
GPS_LOCKOK_TIMEOUT = 2 # Data sheet says about 15 minutes. Because our test
                       # does not necessarily require GPS lock to pass, we
                       # reduce this value in order for the BIST to pass faster
                       # by default.

##############################################################################
# Bist class
##############################################################################
class N3XXBIST(bist.UsrpBIST):
    """
    BIST Tool for the USRP N3xx series
    """
    usrp_type = "N3XX"
    # This defines special tests that are really collections of other tests.
    collections = {
        'standard': ["gpsdo", "rtc", "temp", "fan", "tpm"],
        'extended': "*",
    }
    # Default FPGA image type
    DEFAULT_FPGA_TYPE = 'HG'
    lv_compat_format = {
        'ddr3': {
            'throughput': -1,
        },
        'gpsdo': {
            "class": "",
            "time": "",
            "ept": -1,
            "lat": -1,
            "lon": -1,
            "alt": -1,
            "epx": -1,
            "epy": -1,
            "epv": -1,
            "track": -1,
            "speed": -1,
            "climb": -1,
            "eps": -1,
            "mode": -1,
        },
        'tpm': {
            'tpm0_caps': "",
        },
        'sfp0_loopback': {
            'elapsed_time': -1,
            'max_roundtrip_latency': -1,
            'throughput': -1,
            'max_ber': -1,
            'errors': -1,
            'bits': -1,
        },
        'sfp1_loopback': {
            'elapsed_time': -1,
            'max_roundtrip_latency': -1,
            'throughput': -1,
            'max_ber': -1,
            'errors': -1,
            'bits': -1,
        },
        'qsfp_loopback': {
            'elapsed_time': -1,
            'max_roundtrip_latency': -1,
            'throughput': -1,
            'max_ber': -1,
            'errors': -1,
            'bits': -1,
        },
        'gpio': {
            'write_patterns': [],
            'read_patterns': [],
        },
        'temp': {
            'fpga-thermal-zone': -1,
        },
        'fan': {
            'cooling_device0': -1,
            'cooling_device1': -1,
        },
        'whiterabbit': {
            'lock_status': 0,
        },
    }
    device_args = "type=n3xx,addr=127.0.0.1"

    def __init__(self):
        bist.UsrpBIST.__init__(self)

    def get_mb_periph_mgr(self):
        """Return reference to an n3xx periph manager"""
        from usrp_mpm.periph_manager.n3xx import n3xx
        return n3xx

    def get_product_id(self):
        """Return the mboard product ID (n310 or n300):"""
        product_map = {
            #(mboard, dboard): product
            ('n300', 'magnesium'): 'n300',
            ('n300', ''):          'n300', # n300 mboard with no dboard
            ('n310', 'magnesium'): 'n310',
            ('n310', ''):          'n310', # n310 mboard with no dboard
            ('n310', 'rhodium'):   'n320',
        }
        mb_id = bist.get_product_id_from_eeprom(valid_ids=['n300', 'n310'], cmd='eeprom-id')
        db_id = bist.get_product_id_from_eeprom(valid_ids=['magnesium', 'rhodium', ''], cmd='db-id')
        return product_map[(mb_id, db_id)]

#############################################################################
# BISTS
# All bist_* methods must return True/False success values!
#############################################################################
    def bist_ddr3(self):
        """
        BIST for PL DDR3 DRAM
        Description: Calls a test to examine the speed of the DDR3. To be
        precise, it fires up a UHD session, which runs a DDR3 BiST internally.
        If that works, it'll return estimated throughput that was gathered
        during the DDR3 BiST.

        External Equipment: None

        Return dictionary:
        - throughput: The estimated throughput in bytes/s

        Return status:
        True if the DDR3 bist passed
        """
        assert 'ddr3' in self.tests_to_run
        if self.args.dry_run:
            return True, {'throughput': 1250e6}
        # First, run the BIST without modifications. It will fail if there is no
        # DmaFIFO block.
        result = bist.test_ddr3_with_usrp_probe()
        # If it failed, but the user specified skip_load_fpga, we also return
        # the current result.
        if ('error_msg' not in result) \
                or self.args.option.get('skip_load_fpga', False):
            return result.get('throughput', 0) > 1000e3, result
        # Otherwise, we load the AA FPGA image, because that's one image where
        # we think the DmaFIFO block is actually instantiated.
        self.reload_fpga_image = True
        try:
            bist.load_fpga_image(
                'AA',
                self.device_args,
                self.get_product_id(),
            )
        except Exception as ex:
            return False, {
                'throughput': 0,
                'error_msg': "Failed to load AA image: {}".format(str(ex)),
            }
        result = bist.test_ddr3_with_usrp_probe()
        return result.get('throughput', 0) > 1000e3, result

    def bist_gpsdo(self):
        """
        BIST for GPSDO
        Description: Returns GPS information
        External Equipment: None; Recommend attaching an antenna or providing
                           fake GPS information

        Return dictionary: A TPV dictionary as returned by gpsd.
        See also: http://www.catb.org/gpsd/gpsd_json.html

        Check for mode 2 or 3 to see if it's locked.
        """
        assert 'gpsdo' in self.tests_to_run
        if self.args.dry_run:
            return True, {
                "class": "TPV",
                "time": "2017-04-30T11:48:20.10Z",
                "ept": 0.005,
                "lat": 30.407899,
                "lon": -97.726634,
                "alt": 1327.689,
                "epx": 15.319,
                "epy": 17.054,
                "epv": 124.484,
                "track": 10.3797,
                "speed": 0.091,
                "climb": -0.085,
                "eps": 34.11,
                "mode": 3
            }
        from usrp_mpm.periph_manager import n3xx
        gpio_tca6424 = n3xx.TCA6424(self.mb_rev)
        # Turn on GPS, give some time to acclimatize
        gpio_tca6424.set("PWREN-GPS")
        time.sleep(5)
        gps_warmup_timeout = float(
            self.args.option.get('gps_warmup_timeout', GPS_WARMUP_TIMEOUT))
        gps_lockok_timeout = float(
            self.args.option.get('gps_lockok_timeout', GPS_LOCKOK_TIMEOUT))
        # Wait for WARMUP to go low
        sys.stderr.write(
            "Waiting for WARMUP to go low for up to {} seconds...\n".format(
                gps_warmup_timeout))
        if not bist.poll_with_timeout(
                lambda: not gpio_tca6424.get('GPS-WARMUP'),
                gps_warmup_timeout*1000, 1000
            ):
            raise RuntimeError(
                "GPS-WARMUP did not go low within {} seconds!".format(
                    gps_warmup_timeout))
        sys.stderr.write("Chip is warmed up.\n")
        # Wait for LOCKOK. Data sheet says wait up to 15 minutes for GPS lock.
        sys.stderr.write(
            "Waiting for LOCKOK to go high for up to {} seconds...\n".format(
                gps_lockok_timeout))
        if not bist.poll_with_timeout(
                lambda: gpio_tca6424.get('GPS-LOCKOK'),
                gps_lockok_timeout*1000,
                1000
            ):
            sys.stderr.write("No GPS-LOCKOK!\n")
        sys.stderr.write("GPS-SURVEY status: {}\n".format(
            gpio_tca6424.get('GPS-SURVEY')
        ))
        sys.stderr.write("GPS-PHASELOCK status: {}\n".format(
            gpio_tca6424.get('GPS-PHASELOCK')
        ))
        sys.stderr.write("GPS-ALARM status: {}\n".format(
            gpio_tca6424.get('GPS-ALARM')
        ))
        # Now the chip is on, read back the TPV result
        result = bist.get_gpsd_tpv_result()
        # If we reach this line, we have a valid result and the chip responded.
        # However, it doesn't necessarily mean we had a GPS lock.
        return True, result

    def bist_tpm(self):
        """
        BIST for TPM (Trusted Platform Module)

        This reads the caps value for all detected TPM devices.

        Return dictionary:
        - tpm<N>_caps: TPM manufacturer and version info. Is a multi-line
                       string.

        Return status: True if exactly one TPM device is detected.
        """
        assert 'tpm' in self.tests_to_run
        if self.args.dry_run:
            return True, {
                'tpm0_caps': "Fake caps value\n\nVersion 0.0.0",
            }
        result = bist.get_tpm_caps_info()
        return len(result) == 1, result

    def bist_ref_clock_int(self):
        """
        BIST for clock lock from internal (25 MHz) source.
        Description: Checks to see if the daughtercard can lock to an internal
        clock source.

        External Equipment: None
        Return dictionary:
        - <sensor-name>:
          - locked: Boolean lock status

        There can be multiple ref lock sensors; for a pass condition they all
        need to be asserted.
        """
        assert 'ref_clock_int' in self.tests_to_run
        if self.args.dry_run:
            return True, {'ref_locked': True}
        result = bist.get_ref_clock_prop(
            'internal',
            'internal',
            extra_args={'skip_rfic': 1}
        )
        return 'error_msg' not in result, result

    def bist_ref_clock_ext(self):
        """
        BIST for clock lock from external source. Note: This test requires a
        connected daughterboard with a 'ref lock' sensor available.

        Description: Checks to see if the daughtercard can lock to the external
        reference clock.

        External Equipment: 10 MHz reference Source connected to "ref in".

        Return dictionary:
        - <sensor-name>:
          - locked: Boolean lock status

        There can be multiple ref lock sensors; for a pass condition they all
        need to be asserted.
        """
        assert 'ref_clock_ext' in self.tests_to_run
        if self.args.dry_run:
            return True, {'ref_locked': True}
        result = bist.get_ref_clock_prop(
            'external',
            'external',
            extra_args={'skip_rfic': 1}
        )
        return 'error_msg' not in result, result

    def bist_ref_clock_gpsdo(self):
        """
        BIST for clock lock from external source. Note: This test requires a
        connected daughterboard with a 'ref lock' sensor available.

        Description: Checks to see if the daughtercard can lock to the external
        reference clock.

        External Equipment: 10 MHz reference Source connected to "ref in".

        Return dictionary:
        - <sensor-name>:
          - locked: Boolean lock status

        There can be multiple ref lock sensors; for a pass condition they all
        need to be asserted.
        """
        assert 'ref_clock_gpsdo' in self.tests_to_run
        if self.args.dry_run:
            return True, {'ref_locked': True}
        result = bist.get_ref_clock_prop(
            'gpsdo',
            'gpsdo',
            extra_args={'skip_rfic': 1}
        )
        return 'error_msg' not in result, result

    def bist_sfp0_loopback(self):
        """
        BIST for SFP+ ports:
        Description: Uses one SFP+ port to test the other. Pipes data out
        through one SFP, back to the other.

        External Equipment: Loopback module in SFP0 required
        required.

        Return dictionary:
        - elapsed_time: Float value, test time in seconds
        - max_roundtrip_latency: Float value, max roundtrip latency in seconds
        - throughput: Approximate data throughput in bytes/s
        - max_ber: Estimated maximum BER, float value.
        - errors: Number of errors
        - bits: Number of bits that were transferred
        """
        if self.args.dry_run:
            return True, bist.get_sfp_bist_defaults()
        sfp_bist_results = bist.run_aurora_bist(
            device_args=self.device_args,
            product_id=self.get_product_id(),
            master='misc-auro-regs0',
        )
        self.reload_fpga_image = True
        return bist.aurora_results_to_status(sfp_bist_results)

    def bist_sfp1_loopback(self):
        """
        BIST for SFP+ ports:
        Description: Uses one SFP+ port to test the other. Pipes data out
        through one SFP, back to the other.

        External Equipment: Loopback module in SFP1 required
        required.

        Return dictionary:
        - elapsed_time: Float value, test time in seconds
        - max_roundtrip_latency: Float value, max roundtrip latency in seconds
        - throughput: Approximate data throughput in bytes/s
        - max_ber: Estimated maximum BER, float value.
        - errors: Number of errors
        - bits: Number of bits that were transferred
        """
        if self.args.dry_run:
            return True, bist.get_sfp_bist_defaults()
        sfp_bist_results = bist.run_aurora_bist(
            device_args=self.device_args,
            product_id=self.get_product_id(),
            master='misc-auro-regs1',
        )
        self.reload_fpga_image = True
        return bist.aurora_results_to_status(sfp_bist_results)

    def bist_qsfp_loopback(self):
        """
        BIST for QSFP+ ports:
        Description: Tests individual quads of the QSFP+ connector. You need to
        provide `--option qsfp_port=X` to select the QSFP+ connector.

        External Equipment: Loopback module in QSFPX required

        Return dictionary:
        - elapsed_time: Float value, test time in seconds
        - max_roundtrip_latency: Float value, max roundtrip latency in seconds
        - throughput: Approximate data throughput in bytes/s
        - max_ber: Estimated maximum BER, float value.
        - errors: Number of errors
        - bits: Number of bits that were transferred
        """
        if self.args.dry_run:
            return True, bist.get_sfp_bist_defaults()
        from usrp_mpm.sys_utils import i2c_dev
        from usrp_mpm.periph_manager import n3xx
        qsfp_i2c = i2c_dev.of_get_i2c_adapter(n3xx.N32X_QSFP_I2C_LABEL)
        if qsfp_i2c is None:
            raise RuntimeError("Could not find QSFP board!")
        qsfp_port = int(self.args.option.get('qsfp_port', '0'))
        assert qsfp_port in range(4)
        aurora_regs_label = 'misc-auro-regs{}'.format(qsfp_port)
        sfp_bist_results = bist.run_aurora_bist(
            device_args=self.device_args,
            # Note: We're overwriting the product ID here, because the detection
            # is currently limited to reading the Mboard EEPROM. However, only
            # the N320 has the QSFP board.
            product_id='n320',
            aurora_image_type='AQ',
            master=aurora_regs_label)
        return bist.aurora_results_to_status(sfp_bist_results)

    def bist_sfp_loopback(self):
        """
        BIST for SFP+ ports:
        Description: Uses one SFP+ port to test the other. Pipes data out
        through one SFP, back to the other.

        External Equipment: Loopback cable between the two SFP+ ports
        required.

        Return dictionary:
        - elapsed_time: Float value, test time in seconds
        - max_roundtrip_latency: Float value, max roundtrip latency in seconds
        - throughput: Approximate data throughput in bytes/s
        - max_ber: Estimated maximum BER, float value.
        - errors: Number of errors
        - bits: Number of bits that were transferred
        """
        if self.args.dry_run:
            return True, bist.get_sfp_bist_defaults()
        sfp_bist_results = bist.run_aurora_bist(
            device_args=self.device_args,
            product_id=self.get_product_id(),
            master='misc-auro-regs0',
            slave='misc-auro-regs1',
        )
        self.reload_fpga_image = True
        return bist.aurora_results_to_status(sfp_bist_results)

    def bist_gpio(self):
        """
        BIST for GPIO
        Description: Writes and reads the values to the GPIO

        Needed Equipment: External loopback as follows
            GPIO
            0<->6
            1<->7
            2<->8
            3<->9
            4<->10
            5<->11

        Return dictionary:
        - write_patterns: A list of patterns that were written
        - read_patterns: A list of patterns that were read back
        """
        assert 'gpio' in self.tests_to_run
        # Our FP-GPIO has 12 programmable pins
        gpio_width = 12
        patterns = range(64)
        if self.args.dry_run:
            return True, {
                'write_patterns': list(patterns),
                'read_patterns': list(patterns),
            }
        from usrp_mpm.periph_manager import n3xx, n3xx_periphs
        gpio_tca6424 = n3xx_periphs.TCA6424(self.mb_rev)
        gpio_tca6424.set("FPGA-GPIO-EN")
        mb_regs = n3xx_periphs.MboardRegsControl(n3xx.n3xx.mboard_regs_label, self.log)
        # We set all 12 pins to be driven by the PS
        mb_regs.set_fp_gpio_master(0xFFF)
        # Allow some time for the front-panel GPIOs to become usable
        time.sleep(.5)
        ddr1 = 0x03f # Lower 6 pins are outputs
        ddr2 = 0xfc0 # Upper 6 pins are inputs
        def _run_gpio(ddr, patterns):
            " Run a GPIO test for a given set of patterns "
            gpio_ctrl = n3xx_periphs.FrontpanelGPIO(ddr)
            for pattern in patterns:
                bist.gpio_set_all(gpio_ctrl, pattern, gpio_width, ddr)
                time.sleep(0.1)
                gpio_rb = gpio_ctrl.get_all()
                if  pattern != gpio_rb:
                    return False, {'write_patterns': [pattern],
                                   'read_patterns': [gpio_rb]}
            return True, {'write_patterns': list(patterns),
                          'read_patterns': list(patterns)}
        status, data = _run_gpio(ddr1, patterns)
        if not status:
            return status, data
        status, data = _run_gpio(ddr2, patterns)
        return status, data

    def bist_temp(self):
        """
        BIST for temperature sensors
        Description: Reads the temperature sensors on the motherboards and
        returns their values in mC

        Return dictionary:
        - <thermal-zone-name>: temp in mC
        """
        assert 'temp' in self.tests_to_run
        if self.args.dry_run:
            return True, {'fpga-thermal-zone': 30000}
        result = bist.get_temp_sensor_value(
            lambda device: device.attributes.get('type').decode('ascii'))
        if len(result) < 1:
            result['error_msg'] = "No temperature sensors found!"
        return 'error_msg' not in result, result

    def bist_fan(self):
        """
        BIST for temperature sensors
        Description: Reads the RPM values of the fans on the motherboard

        Return dictionary:
        - <fan-name>: Fan speed in RPM

        External Equipment: None
        """
        assert 'fan' in self.tests_to_run
        if self.args.dry_run:
            return True, {'cooling_device0': 10000, 'cooling_device1': 10000}
        result = bist.get_fan_values()
        return len(result) == 2, result

    def bist_whiterabbit(self):
        """
        BIST for White Rabbit.
        Description: Checks if the WR core can lock.

        External Equipment: A WR source needs to be connected to SFP0.
        """
        assert 'whiterabbit' in self.tests_to_run
        if self.args.dry_run:
            return True, {'lock': True}
        from usrp_mpm.cores import WhiteRabbitRegsControl
        from usrp_mpm.periph_manager.n3xx import n3xx
        from usrp_mpm.periph_manager import n3xx_periphs
        from usrp_mpm.sys_utils import uio
        if not uio.find_uio_device(n3xx.wr_regs_label, logger=self.log)[0]:
            self.log.info("Need to load WX image before proceeding...")
            bist.load_fpga_image(
                'WX',
                self.device_args,
                self.get_product_id(),
            )
            self.log.info("Image loading complete.")
        self.reload_fpga_image = True
        mb_regs = n3xx_periphs.MboardRegsControl(
            n3xx.mboard_regs_label, self.log)
        mb_regs.set_time_source('sfp0', 25e6)
        wr_regs_control = WhiteRabbitRegsControl(
            n3xx.wr_regs_label, self.log)
        lock_status = bist.poll_with_timeout(
            lambda: wr_regs_control.get_time_lock_status(),
            40000, # Try for x ms... this number is set from a few benchtop tests
            1000, # Poll every... second! why not?
        )
        result = {
            'lock_status': int(lock_status),
        }
        return lock_status, result

##############################################################################
# main
##############################################################################
def main():
    " Go, go, go! "
    return N3XXBIST().run()

if __name__ == '__main__':
    exit(not main())
